/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.solr.update;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.LongAdder;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.CodecReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SlowCodecReaderWrapper;
import org.apache.lucene.index.Term;
import org.apache.lucene.queries.function.ValueSource;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefHash;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrConfig.UpdateHandlerInfo;
import org.apache.solr.core.SolrCore;
import org.apache.solr.metrics.SolrMetricProducer;
import org.apache.solr.metrics.SolrMetricsContext;
import org.apache.solr.metrics.otel.instruments.AttributedInstrumentFactory;
import org.apache.solr.metrics.otel.instruments.AttributedLongCounter;
import org.apache.solr.metrics.otel.instruments.AttributedLongUpDownCounter;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.FunctionRangeQuery;
import org.apache.solr.search.QParser;
import org.apache.solr.search.QueryUtils;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.search.SolrSearcherRequirementDetector;
import org.apache.solr.search.SyntaxError;
import org.apache.solr.search.function.ValueSourceRangeFilter;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.TestInjection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * <code>DirectUpdateHandler2</code> implements an UpdateHandler where documents are added directly
 * to the main Lucene index as opposed to adding to a separate smaller index.
 */
public class DirectUpdateHandler2 extends UpdateHandler
    implements SolrCoreState.IndexWriterCloser, SolrMetricProducer {

  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  private static final int NO_FILE_SIZE_UPPER_BOUND_PLACEHOLDER = -1;

  protected final SolrCoreState solrCoreState;

  // stats
  LongAdder addCommands = new LongAdder();
  LongAdder deleteByIdCommands = new LongAdder();
  LongAdder deleteByQueryCommands = new LongAdder();
  LongAdder numDocsPending = new LongAdder();

  // Cumulative commands
  AttributedLongUpDownCounter deleteByQueryCommandsCumulative;
  AttributedLongUpDownCounter deleteByIdCommandsCumulative;
  AttributedLongUpDownCounter addCommandsCumulative;

  AttributedLongCounter submittedAdds;
  AttributedLongCounter submittedDeleteById;
  AttributedLongCounter submittedDeleteByQuery;

  AttributedLongCounter committedAdds;
  AttributedLongCounter committedDeleteById;
  AttributedLongCounter committedDeleteByQuery;

  // Maintenance operations
  AttributedLongCounter expungeDeleteCommands;
  AttributedLongCounter mergeIndexesCommands;
  AttributedLongCounter commitCommands;
  AttributedLongCounter optimizeCommands;

  AttributedLongCounter numErrorsCumulative;

  AttributedLongCounter rollbackCommands;
  AttributedLongCounter splitCommands;
  List<AutoCloseable> toClose;

  // tracks when auto-commit should occur
  protected final CommitTracker commitTracker;
  protected final CommitTracker softCommitTracker;

  protected boolean commitWithinSoftCommit;

  /**
   * package access for testing
   *
   * @lucene.internal
   */
  void setCommitWithinSoftCommit(boolean value) {
    this.commitWithinSoftCommit = value;
  }

  public DirectUpdateHandler2(SolrCore core) {
    super(core, null, false);

    solrCoreState = core.getSolrCoreState();

    UpdateHandlerInfo updateHandlerInfo = core.getSolrConfig().getUpdateHandlerInfo();
    int docsUpperBound = updateHandlerInfo.autoCommmitMaxDocs;
    int timeUpperBound = updateHandlerInfo.autoCommmitMaxTime;
    long fileSizeUpperBound = updateHandlerInfo.autoCommitMaxSizeBytes;
    commitTracker =
        new CommitTracker(
            "Hard",
            core,
            docsUpperBound,
            timeUpperBound,
            fileSizeUpperBound,
            updateHandlerInfo.openSearcher,
            false);

    int softCommitDocsUpperBound = updateHandlerInfo.autoSoftCommmitMaxDocs;
    int softCommitTimeUpperBound = updateHandlerInfo.autoSoftCommmitMaxTime;
    softCommitTracker =
        new CommitTracker(
            "Soft",
            core,
            softCommitDocsUpperBound,
            softCommitTimeUpperBound,
            NO_FILE_SIZE_UPPER_BOUND_PLACEHOLDER,
            true,
            true);

    commitWithinSoftCommit = updateHandlerInfo.commitWithinSoftCommit;

    ZkController zkController = core.getCoreContainer().getZkController();
    if (zkController != null
        && core.getCoreDescriptor().getCloudDescriptor().getReplicaType() == Replica.Type.TLOG) {
      commitWithinSoftCommit = false;
      commitTracker.setOpenSearcher(true);
    }
    if (ulog != null) {
      initUlog(true);
    }
  }

  public DirectUpdateHandler2(SolrCore core, UpdateHandler updateHandler) {
    super(core, updateHandler.getUpdateLog(), false);
    solrCoreState = core.getSolrCoreState();

    UpdateHandlerInfo updateHandlerInfo = core.getSolrConfig().getUpdateHandlerInfo();
    int docsUpperBound = updateHandlerInfo.autoCommmitMaxDocs;
    int timeUpperBound = updateHandlerInfo.autoCommmitMaxTime;
    long fileSizeUpperBound = updateHandlerInfo.autoCommitMaxSizeBytes;
    commitTracker =
        new CommitTracker(
            "Hard",
            core,
            docsUpperBound,
            timeUpperBound,
            fileSizeUpperBound,
            updateHandlerInfo.openSearcher,
            false);

    int softCommitDocsUpperBound = updateHandlerInfo.autoSoftCommmitMaxDocs;
    int softCommitTimeUpperBound = updateHandlerInfo.autoSoftCommmitMaxTime;
    softCommitTracker =
        new CommitTracker(
            "Soft",
            core,
            softCommitDocsUpperBound,
            softCommitTimeUpperBound,
            NO_FILE_SIZE_UPPER_BOUND_PLACEHOLDER,
            updateHandlerInfo.openSearcher,
            true);

    commitWithinSoftCommit = updateHandlerInfo.commitWithinSoftCommit;

    if (ulog != null) {
      // If we are reusing the existing update log, inform the log that its update handler has
      // changed. We do this as late as possible.
      // TODO: not sure _why_ we "do this as late as possible". Consider simplifying by
      //  moving `ulog.init(UpdateHandler, SolrCore)` entirely into the `UpdateHandler` ctor,
      //  avoiding the need for the `UpdateHandler(SolrCore, UpdateLog, boolean)` ctor
      //  (with the extra boolean `initUlog` param).
      initUlog(ulog != updateHandler.getUpdateLog());
    }
  }

  @Override
  public void initializeMetrics(SolrMetricsContext parentContext, Attributes attributes) {
    this.solrMetricsContext = parentContext.getChildContext(this);

    var baseAttributes =
        attributes.toBuilder()
            .put(AttributeKey.stringKey("category"), getCategory().toString())
            .build();

    boolean aggregateNodeLevelMetricsEnabled =
        core.getSolrConfig().getUpdateHandlerInfo().aggregateNodeLevelMetricsEnabled;

    createMetrics(baseAttributes, aggregateNodeLevelMetricsEnabled);
  }

  private void createMetrics(Attributes baseAttributes, boolean aggregateToNodeRegistry) {
    final List<AutoCloseable> observables = new ArrayList<>();

    AttributedInstrumentFactory factory =
        new AttributedInstrumentFactory(
            solrMetricsContext, baseAttributes, aggregateToNodeRegistry);

    addCommandsCumulative =
        factory.attributedLongUpDownCounter(
            "solr_core_update_cumulative_ops",
            "Cumulative number of update commands processed. Cumulative can decrease from rollback command",
            Attributes.of(OPERATION_ATTR, "adds"));

    deleteByIdCommandsCumulative =
        factory.attributedLongUpDownCounter(
            "solr_core_update_cumulative_ops",
            "Cumulative number of update commands processed. Cumulative can decrease from rollback command",
            Attributes.of(OPERATION_ATTR, "deletes_by_id"));

    deleteByQueryCommandsCumulative =
        factory.attributedLongUpDownCounter(
            "solr_core_update_cumulative_ops",
            "Cumulative number of update commands processed. Cumulative can decrease from rollback command",
            Attributes.of(OPERATION_ATTR, "deletes_by_query"));

    commitCommands =
        factory.attributedLongCounter(
            "solr_core_update_commit_ops",
            "Total number of commit operations",
            Attributes.of(OPERATION_ATTR, "commits"));

    optimizeCommands =
        factory.attributedLongCounter(
            "solr_core_update_commit_ops",
            "Total number of commit operations",
            Attributes.of(OPERATION_ATTR, "optimize"));

    mergeIndexesCommands =
        factory.attributedLongCounter(
            "solr_core_update_commit_ops",
            "Total number of commit operations",
            Attributes.of(OPERATION_ATTR, "merge_indexes"));

    expungeDeleteCommands =
        factory.attributedLongCounter(
            "solr_core_update_commit_ops",
            "Total number of commit operations",
            Attributes.of(OPERATION_ATTR, "expunge_deletes"));

    rollbackCommands =
        factory.attributedLongCounter(
            "solr_core_update_maintenance_ops",
            "Total number of maintenance operations",
            Attributes.of(OPERATION_ATTR, "rollback"));

    splitCommands =
        factory.attributedLongCounter(
            "solr_core_update_maintenance_ops",
            "Total number of maintenance operations",
            Attributes.of(OPERATION_ATTR, "split"));

    numErrorsCumulative =
        factory.attributedLongCounter(
            "solr_core_update_errors", "Total number of update errors", Attributes.empty());

    submittedAdds =
        factory.attributedLongCounter(
            "solr_core_update_submitted_ops",
            "Total number of submitted update operations",
            Attributes.of(OPERATION_ATTR, "adds"));

    submittedDeleteById =
        factory.attributedLongCounter(
            "solr_core_update_submitted_ops",
            "Total number of submitted update operations",
            Attributes.of(OPERATION_ATTR, "deletes_by_id"));

    submittedDeleteByQuery =
        factory.attributedLongCounter(
            "solr_core_update_submitted_ops",
            "Total number of submitted update operations",
            Attributes.of(OPERATION_ATTR, "deletes_by_query"));

    committedAdds =
        factory.attributedLongCounter(
            "solr_core_update_committed_ops",
            "Total number of committed update operations",
            Attributes.of(OPERATION_ATTR, "adds"));

    committedDeleteById =
        factory.attributedLongCounter(
            "solr_core_update_committed_ops",
            "Total number of committed update operations",
            Attributes.of(OPERATION_ATTR, "deletes_by_id"));

    committedDeleteByQuery =
        factory.attributedLongCounter(
            "solr_core_update_committed_ops",
            "Total number of committed update operations",
            Attributes.of(OPERATION_ATTR, "deletes_by_query"));

    // Create observable metrics only for core registry
    observables.add(
        solrMetricsContext.observableLongCounter(
            "solr_core_update_auto_commits",
            "Current number of auto commits",
            (observableLongMeasurement -> {
              observableLongMeasurement.record(
                  commitTracker.getCommitCount(),
                  baseAttributes.toBuilder().put(TYPE_ATTR, "auto_commits").build());
              observableLongMeasurement.record(
                  softCommitTracker.getCommitCount(),
                  baseAttributes.toBuilder().put(TYPE_ATTR, "soft_auto_commits").build());
            })));

    observables.add(
        solrMetricsContext.observableLongGauge(
            "solr_core_update_commit_stats",
            "Metrics around commits",
            (observableLongMeasurement -> {
              if (commitTracker.getDocsUpperBound() > 0) {
                observableLongMeasurement.record(
                    commitTracker.getDocsUpperBound(),
                    baseAttributes.toBuilder().put(TYPE_ATTR, "auto_commit_max_docs").build());
              }
              if (commitTracker.getTLogFileSizeUpperBound() > 0) {
                observableLongMeasurement.record(
                    commitTracker.getTLogFileSizeUpperBound(),
                    baseAttributes.toBuilder().put(TYPE_ATTR, "auto_commit_max_size").build());
              }
              if (softCommitTracker.getDocsUpperBound() > 0) {
                observableLongMeasurement.record(
                    softCommitTracker.getDocsUpperBound(),
                    baseAttributes.toBuilder().put(TYPE_ATTR, "soft_auto_commit_max_docs").build());
              }
              if (commitTracker.getTimeUpperBound() > 0) {
                observableLongMeasurement.record(
                    commitTracker.getTimeUpperBound(),
                    baseAttributes.toBuilder().put(TYPE_ATTR, "auto_commit_max_time").build());
              }
              if (softCommitTracker.getTimeUpperBound() > 0) {
                observableLongMeasurement.record(
                    softCommitTracker.getTimeUpperBound(),
                    baseAttributes.toBuilder().put(TYPE_ATTR, "soft_auto_commit_max_time").build());
              }
            })));

    observables.add(
        solrMetricsContext.observableLongGauge(
            "solr_core_update_docs_pending_commit",
            "Current number of documents pending commit. Value is reset to 0 on commit.",
            (observableLongMeasurement) -> {
              observableLongMeasurement.record(
                  numDocsPending.longValue(),
                  baseAttributes.toBuilder().put(OPERATION_ATTR, "docs_pending").build());
            }));

    this.toClose = Collections.unmodifiableList(observables);
  }

  private void deleteAll() throws IOException {
    log.info("REMOVING ALL DOCUMENTS FROM INDEX");
    RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
    try {
      iw.get().deleteAll();
    } finally {
      iw.decref();
    }
  }

  protected void rollbackWriter() throws IOException {
    numDocsPending.reset();
    solrCoreState.rollbackIndexWriter(core);
  }

  @Override
  public int addDoc(AddUpdateCommand cmd) throws IOException {
    TestInjection.injectDirectUpdateLatch();
    try {
      return addDoc0(cmd);
    } catch (SolrException e) {
      throw e;
    } catch (AlreadyClosedException e) {
      String errorMsg =
          "Server error writing document id " + cmd.getPrintableId() + " to the index.";
      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, errorMsg, e);
    } catch (IllegalArgumentException iae) {
      String errorDetails =
          (iae.getCause() instanceof BytesRefHash.MaxBytesLengthExceededException
              ? ". Perhaps the document has an indexed string field (solr.StrField) which is too large"
              : "");
      String errorMsg =
          "Exception writing document id "
              + cmd.getPrintableId()
              + " to the index; possible analysis error: "
              + iae.getMessage()
              + errorDetails;
      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, errorMsg, iae);
    } catch (RuntimeException t) {
      SolrException.ErrorCode errorCode =
          core.getCoreContainer().checkTragicException(core)
              ? SolrException.ErrorCode.SERVER_ERROR
              : SolrException.ErrorCode.BAD_REQUEST;
      String errorMsg =
          "Exception writing document id "
              + cmd.getPrintableId()
              + " to the index"
              + (errorCode == SolrException.ErrorCode.SERVER_ERROR
                  ? "."
                  : "; possible analysis error.");
      throw new SolrException(errorCode, errorMsg, t);
    }
  }

  /**
   * This is the implementation of {@link #addDoc(AddUpdateCommand)}. It is factored out to allow an
   * exception handler to decorate RuntimeExceptions with information about the document being
   * handled.
   *
   * @param cmd the command.
   * @return the count.
   */
  private int addDoc0(AddUpdateCommand cmd) throws IOException {
    int rc = -1;

    addCommands.increment();
    addCommandsCumulative.inc();
    submittedAdds.inc();

    // if there is no ID field, don't overwrite
    if (idField == null) {
      cmd.overwrite = false;
    }
    try {
      if ((cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
        if (ulog != null) ulog.add(cmd);
        return 1;
      }

      if (cmd.overwrite) {
        // Check for delete by query commands newer (i.e. reordered). This
        // should always be null on a leader
        List<UpdateLog.DBQ> deletesAfter = null;
        if (ulog != null && cmd.version > 0) {
          deletesAfter = ulog.getDBQNewer(cmd.version);
        }

        if (deletesAfter != null) {
          addAndDelete(cmd, deletesAfter);
        } else {
          doNormalUpdate(cmd);
        }
      } else {
        allowDuplicateUpdate(cmd);
      }

      if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) {
        if (commitWithinSoftCommit) {
          commitTracker.addedDocument(-1, this::getCurrentTLogSize);
          softCommitTracker.addedDocument(cmd.commitWithin);
        } else {
          softCommitTracker.addedDocument(-1);
          commitTracker.addedDocument(cmd.commitWithin, this::getCurrentTLogSize);
        }
      }

      rc = 1;
    } finally {
      if (rc != 1) {
        numErrorsCumulative.inc();
      } else {
        numDocsPending.increment();
      }
    }

    return rc;
  }

  private void allowDuplicateUpdate(AddUpdateCommand cmd) throws IOException {
    RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
    try {
      IndexWriter writer = iw.get();
      writer.addDocuments(cmd.makeLuceneDocs());
      if (ulog != null) ulog.add(cmd);

    } finally {
      iw.decref();
    }
  }

  private void doNormalUpdate(AddUpdateCommand cmd) throws IOException {
    RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
    try {
      IndexWriter writer = iw.get();

      updateDocOrDocValues(cmd, writer);

      // Add to the transaction log *after* successfully adding to the
      // index, if there was no error.
      // This ordering ensures that if we log it, it's definitely been
      // added to the index.
      // This also ensures that if a commit sneaks in-between, that we
      // know everything in a particular
      // log version was definitely committed.
      if (ulog != null) ulog.add(cmd);

    } finally {
      iw.decref();
    }
  }

  private void addAndDelete(AddUpdateCommand cmd, List<UpdateLog.DBQ> deletesAfter)
      throws IOException {
    // this logic is different enough from doNormalUpdate that it's separate
    log.info("Reordered DBQs detected.  Update={} DBQs={}", cmd, deletesAfter);
    List<Query> dbqList = new ArrayList<>(deletesAfter.size());
    for (UpdateLog.DBQ dbq : deletesAfter) {
      try {
        DeleteUpdateCommand tmpDel = new DeleteUpdateCommand(cmd.req);
        tmpDel.query = dbq.q;
        tmpDel.version = -dbq.version;
        dbqList.add(getQuery(tmpDel));
      } catch (Exception e) {
        log.error("Exception parsing reordered query : {}", dbq, e);
      }
    }

    RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
    try {
      IndexWriter writer = iw.get();

      // see comment in deleteByQuery
      synchronized (solrCoreState.getUpdateLock()) {
        updateDocOrDocValues(cmd, writer);

        if (cmd.isInPlaceUpdate() && ulog != null) {
          ulog.openRealtimeSearcher(); // This is needed due to LUCENE-7344.
        }
        for (Query q : dbqList) {
          writer.deleteDocuments(new DeleteByQueryWrapper(q, core.getLatestSchema()));
        }
        if (ulog != null) ulog.add(cmd, true); // this needs to be protected by update lock
      }
    } finally {
      iw.decref();
    }
  }

  private void updateDeleteTrackers(DeleteUpdateCommand cmd) {
    if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) {
      if (commitWithinSoftCommit) {
        softCommitTracker.deletedDocument(cmd.commitWithin);
      } else {
        commitTracker.deletedDocument(cmd.commitWithin);
      }

      if (commitTracker.getTimeUpperBound() > 0) {
        commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound());
      }

      commitTracker.scheduleMaxSizeTriggeredCommitIfNeeded(this::getCurrentTLogSize);

      if (softCommitTracker.getTimeUpperBound() > 0) {
        softCommitTracker.scheduleCommitWithin(softCommitTracker.getTimeUpperBound());
      }
    }
  }

  // we don't return the number of docs deleted because it's not always possible to quickly know
  // that info.
  @Override
  public void delete(DeleteUpdateCommand cmd) throws IOException {
    TestInjection.injectDirectUpdateLatch();
    deleteByIdCommands.increment();
    deleteByIdCommandsCumulative.inc();
    submittedDeleteById.inc();

    if ((cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
      if (ulog != null) ulog.delete(cmd);
      return;
    }

    Term deleteTerm = getIdTerm(cmd.getIndexedId());
    // SolrCore.verbose("deleteDocuments",deleteTerm,writer);
    RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
    try {
      iw.get().deleteDocuments(deleteTerm);
    } finally {
      iw.decref();
    }
    // SolrCore.verbose("deleteDocuments",deleteTerm,"DONE");

    if (ulog != null) ulog.delete(cmd);

    updateDeleteTrackers(cmd);
  }

  public void clearIndex() throws IOException {
    deleteAll();
    if (ulog != null) {
      ulog.deleteAll();
    }
  }

  private Query getQuery(DeleteUpdateCommand cmd) {
    Query q;
    try {
      // move this higher in the stack?
      QParser parser = QParser.getParser(cmd.getQuery(), cmd.req);
      q = parser.getQuery();
      q = QueryUtils.makeQueryable(q);

      // Make sure not to delete newer versions
      if (ulog != null && cmd.getVersion() != 0 && cmd.getVersion() != -Long.MAX_VALUE) {
        BooleanQuery.Builder bq = new BooleanQuery.Builder();
        bq.add(q, Occur.MUST);
        SchemaField sf = ulog.getVersionInfo().getVersionField();
        ValueSource vs = sf.getType().getValueSource(sf, null);
        ValueSourceRangeFilter filt =
            new ValueSourceRangeFilter(
                vs, Long.toString(Math.abs(cmd.getVersion())), null, true, true);
        FunctionRangeQuery range = new FunctionRangeQuery(filt);
        // formulated in the "MUST_NOT" sense so we can delete docs w/o a version (some tests depend
        // on this...)
        bq.add(range, Occur.MUST_NOT);
        q = bq.build();
      }

      return q;

    } catch (SyntaxError e) {
      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
    }
  }

  // we don't return the number of docs deleted because it's not always possible to quickly know
  // that info.
  @Override
  public void deleteByQuery(DeleteUpdateCommand cmd) throws IOException {
    TestInjection.injectDirectUpdateLatch();
    deleteByQueryCommands.increment();
    deleteByQueryCommandsCumulative.inc();
    submittedDeleteByQuery.inc();
    boolean madeIt = false;
    try {
      Query q = getQuery(cmd);

      // Parts of the DBQ codepath run the query using a standard Lucene IndexSearcher, so block any
      // queries that we know require a SolrIndexSearcher
      final var unsupportedQDetector = new SolrSearcherRequirementDetector();
      q.visit(unsupportedQDetector);
      if (unsupportedQDetector.getRequiresSolrSearcher()) {
        throw new SolrException(
            SolrException.ErrorCode.BAD_REQUEST,
            "Query [" + cmd.getQuery() + "] is not supported in delete-by-query operations");
      }

      if ((cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
        if (ulog != null) ulog.deleteByQuery(cmd);
        madeIt = true;
        return;
      }

      boolean delAll = MatchAllDocsQuery.class == q.getClass();

      // currently for testing purposes.  Do a delete of complete index w/o worrying about versions,
      // don't log, clean up most state in update log, etc
      if (delAll && cmd.getVersion() == -Long.MAX_VALUE) {
        synchronized (solrCoreState.getUpdateLock()) {
          deleteAll();
          ulog.deleteAll();
          return;
        }
      }

      //
      // synchronized to prevent deleteByQuery from running during the "open new searcher"
      // part of a commit.  DBQ needs to signal that a fresh reader will be needed for
      // a realtime view of the index.  When a new searcher is opened after a DBQ, that
      // flag can be cleared.  If those thing happen concurrently, it's not thread safe.
      // Also, ulog.deleteByQuery clears caches and is thus not safe to be called between
      // preSoftCommit/postSoftCommit and thus we use the updateLock to prevent this (just
      // as we use around ulog.preCommit... also see comments in ulog.postSoftCommit)
      //
      synchronized (solrCoreState.getUpdateLock()) {

        // We are reopening a searcher before applying the deletes to overcome LUCENE-7344.
        // Once LUCENE-7344 is resolved, we can consider removing this.
        if (ulog != null) ulog.openRealtimeSearcher();

        if (delAll) {
          deleteAll();
        } else {
          RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
          try {
            iw.get().deleteDocuments(new DeleteByQueryWrapper(q, core.getLatestSchema()));
          } finally {
            iw.decref();
          }
        }

        if (ulog != null) ulog.deleteByQuery(cmd); // this needs to be protected by the update lock
      }

      madeIt = true;

      updateDeleteTrackers(cmd);

    } finally {
      if (!madeIt) {
        numErrorsCumulative.inc();
      }
    }
  }

  @Override
  public int mergeIndexes(MergeIndexesCommand cmd) throws IOException {
    TestInjection.injectDirectUpdateLatch();
    mergeIndexesCommands.inc();
    int rc;

    log.info("start {}", cmd);

    List<DirectoryReader> readers = cmd.readers;
    if (readers != null && readers.size() > 0) {
      List<CodecReader> mergeReaders = new ArrayList<>();
      for (DirectoryReader reader : readers) {
        for (LeafReaderContext leaf : reader.leaves()) {
          mergeReaders.add(SlowCodecReaderWrapper.wrap(leaf.reader()));
        }
      }
      RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
      try {
        iw.get().addIndexes(mergeReaders.toArray(new CodecReader[0]));
      } finally {
        iw.decref();
      }
      rc = 1;
    } else {
      rc = 0;
    }
    log.info("end_mergeIndexes");

    // TODO: consider soft commit issues
    if (rc == 1 && commitTracker.getTimeUpperBound() > 0) {
      commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound());
    } else if (rc == 1 && softCommitTracker.getTimeUpperBound() > 0) {
      softCommitTracker.scheduleCommitWithin(softCommitTracker.getTimeUpperBound());
    }

    return rc;
  }

  public void prepareCommit(CommitUpdateCommand cmd) throws IOException {

    boolean error = true;

    try {
      log.debug("start {}", cmd);
      RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
      try {
        SolrIndexWriter.setCommitData(iw.get(), cmd.getVersion(), cmd.commitData);
        iw.get().prepareCommit();
      } finally {
        iw.decref();
      }

      log.debug("end_prepareCommit");

      error = false;
    } finally {
      if (error) {
        numErrorsCumulative.inc();
      }
    }
  }

  @Override
  public void commit(CommitUpdateCommand cmd) throws IOException {
    TestInjection.injectDirectUpdateLatch();
    if (cmd.prepareCommit) {
      prepareCommit(cmd);
      return;
    }

    if (cmd.optimize) {
      optimizeCommands.inc();
    } else {
      commitCommands.inc();
      if (cmd.expungeDeletes) expungeDeleteCommands.inc();
    }

    Future<?>[] waitSearcher =
        cmd.waitSearcher ? (Future<?>[]) Array.newInstance(Future.class, 1) : null;

    boolean error = true;
    try {
      // only allow one hard commit to proceed at once
      if (!cmd.softCommit) {
        solrCoreState.getCommitLock().lock();
      }

      log.debug("start {}", cmd);

      // We must cancel pending commits *before* we actually execute the commit.

      if (cmd.openSearcher) {
        // we can cancel any pending soft commits if this commit will open a new searcher
        softCommitTracker.cancelPendingCommit();
      }
      if (!cmd.softCommit && (cmd.openSearcher || !commitTracker.getOpenSearcher())) {
        // cancel a pending hard commit if this commit is of equal or greater "strength"...
        // If the autoCommit has openSearcher=true, then this commit must have openSearcher=true
        // to cancel.
        commitTracker.cancelPendingCommit();
      }

      RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
      try {
        if (cmd.isClosingOnCommit()) {
          core.readOnly = true;
        }
        IndexWriter writer = iw.get();
        if (cmd.optimize) {
          writer.forceMerge(cmd.maxOptimizeSegments);
        } else if (cmd.expungeDeletes) {
          writer.forceMergeDeletes();
        }

        if (!cmd.softCommit) {
          synchronized (solrCoreState.getUpdateLock()) {
            // sync is currently needed to prevent preCommit from being called between preSoft and
            // postSoft... see postSoft comments.
            if (ulog != null) ulog.preCommit(cmd);
          }

          // SolrCore.verbose("writer.commit() start writer=",writer);

          if (shouldCommit(cmd, writer)) {
            SolrIndexWriter.setCommitData(writer, cmd.getVersion(), cmd.commitData);
            writer.commit();
          } else {
            log.debug("No uncommitted changes. Skipping IW.commit.");
          }

          // SolrCore.verbose("writer.commit() end");
          numDocsPending.reset();
          callPostCommitCallbacks();
        }
      } finally {
        iw.decref();
      }

      if (cmd.optimize) {
        callPostOptimizeCallbacks();
      }

      if (cmd.softCommit) {
        // ulog.preSoftCommit();
        synchronized (solrCoreState.getUpdateLock()) {
          if (ulog != null) ulog.preSoftCommit(cmd);
          core.getSearcher(true, false, waitSearcher, true);
          if (ulog != null) ulog.postSoftCommit(cmd);
        }
        callPostSoftCommitCallbacks();
      } else {
        synchronized (solrCoreState.getUpdateLock()) {
          if (ulog != null) ulog.preSoftCommit(cmd);
          if (cmd.openSearcher) {
            core.getSearcher(true, false, waitSearcher);
          } else if (!cmd.isClosingOnCommit()) {
            // force open a new realtime searcher so realtime-get and versioning code can see the
            // latest
            RefCounted<SolrIndexSearcher> searchHolder = core.openNewSearcher(true, true);
            searchHolder.decref();
          }
          if (ulog != null) ulog.postSoftCommit(cmd);
        }
        if (ulog != null) ulog.postCommit(cmd); // postCommit currently means new searcher has
        // also been opened
      }

      // reset commit tracking

      if (cmd.softCommit) {
        softCommitTracker.didCommit();
      } else {
        commitTracker.didCommit();
      }

      log.debug("end_commit_flush");

      error = false;
    } finally {
      if (!cmd.softCommit) {
        solrCoreState.getCommitLock().unlock();
      }
      committedAdds.add(addCommands.longValue());
      committedDeleteById.add(deleteByIdCommands.longValue());
      committedDeleteByQuery.add(deleteByQueryCommands.longValue());
      addCommands.reset();
      deleteByIdCommands.reset();
      deleteByQueryCommands.reset();
      if (error) {
        numErrorsCumulative.inc();
      }
    }

    // if we are supposed to wait for the searcher to be registered, then we should do it
    // outside any synchronized block so that other update operations can proceed.
    if (waitSearcher != null && waitSearcher[0] != null) {
      try {
        waitSearcher[0].get();
      } catch (InterruptedException | ExecutionException e) {
        log.error("Exception waiting for searcher", e);
      }
    }
  }

  /**
   * Determines whether the commit command should effectively trigger a commit on the index writer.
   * This method is called with the commit lock and is the last step before effectively calling
   * {@link IndexWriter#commit()}.
   */
  protected boolean shouldCommit(CommitUpdateCommand cmd, IndexWriter writer) throws IOException {
    return writer.hasUncommittedChanges() || (cmd.commitData != null && !cmd.commitData.isEmpty());
  }

  @Override
  public void newIndexWriter(boolean rollback) throws IOException {
    solrCoreState.newIndexWriter(core, rollback);
  }

  /**
   * @since Solr 1.4
   */
  @Override
  public void rollback(RollbackUpdateCommand cmd) throws IOException {
    TestInjection.injectDirectUpdateLatch();
    if (core.getCoreContainer().isZooKeeperAware()) {
      throw new UnsupportedOperationException(
          "Rollback is currently not supported in SolrCloud mode. (SOLR-4895)");
    }

    rollbackCommands.inc();

    boolean error = true;

    try {
      log.info("start {}", cmd);

      rollbackWriter();

      // callPostRollbackCallbacks();

      // reset commit tracking
      commitTracker.didRollback();
      softCommitTracker.didRollback();

      log.info("end_rollback");

      error = false;
    } finally {
      addCommandsCumulative.add(-addCommands.sumThenReset());
      deleteByIdCommandsCumulative.add(-deleteByIdCommands.sumThenReset());
      deleteByQueryCommandsCumulative.add(-deleteByQueryCommands.sumThenReset());
      if (error) {
        numErrorsCumulative.inc();
      }
    }
  }

  @Override
  public UpdateLog getUpdateLog() {
    return ulog;
  }

  @Override
  public void close() throws IOException {
    log.debug("closing {}", this);

    commitTracker.close();
    softCommitTracker.close();
    IOUtils.closeQuietly(toClose);
    numDocsPending.reset();
    try {
      super.close();
    } catch (Exception e) {
      throw new IOException("Error closing", e);
    }
  }

  // IndexWriterCloser interface method - called from solrCoreState.decref(this)
  @Override
  public void closeWriter(IndexWriter writer) throws IOException {
    log.trace("closeWriter({}): ulog={}", writer, ulog);

    assert TestInjection.injectNonGracefullClose(core.getCoreContainer());

    boolean clearRequestInfo = false;

    SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
    SolrQueryResponse rsp = new SolrQueryResponse();
    if (SolrRequestInfo.getRequestInfo() == null) {
      clearRequestInfo = true;
      SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp)); // important for debugging
    }
    try {

      if (TestInjection.injectSkipIndexWriterCommitOnClose(writer)) {
        // if this TestInjection triggers, we do some simple rollback()
        // (which closes the underlying IndexWriter) and then return immediately
        log.warn("Skipping commit for IndexWriter.close() due to TestInjection");
        if (writer != null) {
          writer.rollback();
        }
        // we shouldn't close the transaction logs either, but leaving them open
        // means we can't delete them on windows (needed for tests)
        if (ulog != null) ulog.close(false);

        return;
      }

      // do a commit before we quit?
      boolean tryToCommit =
          writer != null
              && ulog != null
              && ulog.hasUncommittedChanges()
              && ulog.getState() == UpdateLog.State.ACTIVE;

      // be tactical with this lock! closing the updatelog can deadlock when it tries to commit
      solrCoreState.getCommitLock().lock();
      try {
        try {
          if (log.isInfoEnabled()) {
            log.info(
                "Committing on IndexWriter.close() {}.",
                (tryToCommit ? "" : " ... SKIPPED (unnecessary)"));
          }
          if (tryToCommit) {
            CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
            cmd.openSearcher = false;
            cmd.waitSearcher = false;
            cmd.softCommit = false;

            // TODO: keep other commit callbacks from being called?
            // this.commit(cmd); // too many test failures using this method... is it because of
            // callbacks?

            synchronized (solrCoreState.getUpdateLock()) {
              ulog.preCommit(cmd);
            }

            // todo: refactor this shared code (or figure out why a real CommitUpdateCommand can't
            // be used)
            if (shouldCommit(cmd, writer)) {
              SolrIndexWriter.setCommitData(writer, cmd.getVersion(), cmd.commitData);
              writer.commit();
            }

            synchronized (solrCoreState.getUpdateLock()) {
              ulog.postCommit(cmd);
            }
          }
        } catch (Throwable th) {
          log.error("Error in final commit", th);
          if (th instanceof OutOfMemoryError) {
            throw (OutOfMemoryError) th;
          }
        }

      } finally {
        solrCoreState.getCommitLock().unlock();
      }
    } finally {
      if (clearRequestInfo) SolrRequestInfo.clearRequestInfo();
    }
    // we went through the normal process to commit, so we don't have to artificially
    // cap any ulog files.
    try {
      if (ulog != null) ulog.close(false);
    } catch (Throwable th) {
      log.error("Error closing log files", th);
      if (th instanceof OutOfMemoryError) {
        throw (OutOfMemoryError) th;
      }
    }

    if (writer != null) {
      writer.close();
    }
  }

  @Override
  public void split(SplitIndexCommand cmd) throws IOException {
    commit(new CommitUpdateCommand(cmd.req, false));
    SolrIndexSplitter splitter = new SolrIndexSplitter(cmd);
    splitCommands.inc();
    NamedList<Object> results = new NamedList<>();
    try {
      splitter.split(results);
      cmd.rsp.addResponse(results);
    } catch (IOException e) {
      numErrorsCumulative.inc();
      throw e;
    }
  }

  /**
   * Calls either {@link IndexWriter#updateDocValues} or <code>IndexWriter#updateDocument</code>(s)
   * as needed based on {@link AddUpdateCommand#isInPlaceUpdate}.
   *
   * <p>If the this is an UPDATE_INPLACE cmd, then all fields included in {@link
   * AddUpdateCommand#makeLuceneDocForInPlaceUpdate} must either be the uniqueKey field, or be
   * DocValue only fields.
   *
   * @param cmd - cmd apply to IndexWriter
   * @param writer - IndexWriter to use
   */
  private void updateDocOrDocValues(AddUpdateCommand cmd, IndexWriter writer) throws IOException {
    // this code path requires an idField in order to potentially replace a doc
    assert idField != null;
    boolean hasUpdateTerm = cmd.updateTerm != null; // AKA dedupe

    if (cmd.isInPlaceUpdate()) {
      if (hasUpdateTerm) {
        throw new IllegalStateException(
            "cmd.updateTerm/dedupe is not compatible with in-place updates");
      }
      // we don't support the solrInputDoc with nested child docs either but we'll throw an
      // exception if attempted

      // can't use cmd.getIndexedId because it will be a root doc if this doc is a child
      Term updateTerm =
          new Term(
              idField.getName(),
              core.getLatestSchema().indexableUniqueKey(cmd.getSelfOrNestedDocIdStr()));
      // skips uniqueKey and _root_
      List<IndexableField> fields = cmd.makeLuceneDocForInPlaceUpdate().getFields();
      log.debug("updateDocValues({})", cmd);
      writer.updateDocValues(updateTerm, fields.toArray(new Field[0]));

    } else { // more normal path

      Iterable<Document> nestedDocs = cmd.makeLuceneDocs();
      Term idTerm = getIdTerm(cmd.getIndexedId());
      Term updateTerm = hasUpdateTerm ? cmd.updateTerm : idTerm;

      log.debug("updateDocuments({})", cmd);
      writer.updateDocuments(updateTerm, nestedDocs);

      // If hasUpdateTerm, then delete any existing documents with the same ID other than the one
      // added above (used in near-duplicate replacement)
      if (hasUpdateTerm) { // rare
        BooleanQuery.Builder bq = new BooleanQuery.Builder();
        // don't want the one we added above (will be unique)
        bq.add(new TermQuery(updateTerm), Occur.MUST_NOT);
        bq.add(new TermQuery(idTerm), Occur.MUST); // same ID
        writer.deleteDocuments(new DeleteByQueryWrapper(bq.build(), core.getLatestSchema()));
      }
    }
  }

  private Term getIdTerm(BytesRef termVal) {
    boolean useRootId = core.getLatestSchema().isUsableForChildDocs();
    return new Term(useRootId ? IndexSchema.ROOT_FIELD_NAME : idField.getName(), termVal);
  }

  /////////////////////////////////////////////////////////////////////
  // SolrInfoBean stuff: Statistics and Module Info
  /////////////////////////////////////////////////////////////////////

  @Override
  public String getName() {
    return DirectUpdateHandler2.class.getName();
  }

  @Override
  public String getDescription() {
    return "Update handler that efficiently directly updates the on-disk main lucene index";
  }

  @Override
  public SolrCoreState getSolrCoreState() {
    return solrCoreState;
  }

  private long getCurrentTLogSize() {
    return ulog != null && ulog.hasUncommittedChanges() ? ulog.getCurrentLogSizeFromStream() : -1;
  }

  // allow access for tests
  public CommitTracker getCommitTracker() {
    return commitTracker;
  }

  // allow access for tests
  public CommitTracker getSoftCommitTracker() {
    return softCommitTracker;
  }
}
